{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Incrementally Train Large Datasets\n",
    "==================================\n",
    "\n",
    "We can train models on large datasets one batch at a time.  Many Scikit-Learn estimators implement a `partial_fit` method to enable incremental learning in batches.  \n",
    "\n",
    "```python\n",
    "est = SGDClassifier(...)\n",
    "est.partial_fit(X_train_1, y_train_1)\n",
    "est.partial_fit(X_train_2, y_train_2)\n",
    "...\n",
    "```\n",
    "\n",
    "The Scikit-Learn documentation discusses this approach in more depth in their [user guide](http://scikit-learn.org/stable/modules/scaling_strategies.html).\n",
    "\n",
    "This notebook demonstrates the use of Dask-ML's `Incremental` meta-estimator, which automates the use of Scikit-Learn's `partial_fit` over Dask arrays and dataframes. Scikit-Learn handles all of the computation while Dask handles the data management, loading and moving batches of data as necessary. This allows scaling to large datasets distributed across many machines, or to datasets that do not fit in memory, all with a familiar workflow.\n",
    "\n",
    "This example shows ...\n",
    "\n",
    "* wrapping a Scikit-Learn estimator that implements `partial_fit` with the [Dask-ML Incremental](http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) meta-estimator\n",
    "* training, predicting, and scoring on this wrapped estimator\n",
    "\n",
    "Although this example uses Scikit-Learn's SGDClassifer, the `Incremental` meta-estimator will work for any class that implements `partial_fit` and the [scikit-learn base estimator API].\n",
    "\n",
    "<img src=\"http://scikit-learn.org/stable/_static/scikit-learn-logo-small.png\"> <img src=\"https://www.continuum.io/sites/default/files/dask_stacked.png\" width=\"100px\">\n",
    "\n",
    "[scikit-learn base estimator API]:http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup Dask\n",
    "\n",
    "We first start a Dask client in order to get access to the Dask dashboard, which will provide progress and performance metrics. \n",
    "\n",
    "You can view the dashboard by clicking on the dashboard link after you run the cell"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "client = Client(n_workers=4, threads_per_worker=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Data\n",
    "\n",
    "We create a synthetic dataset that is large enough to be interesting, but small enough to run quickly.  \n",
    "\n",
    "Our dataset has 1,000,000 examples and 100 features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "import dask.array as da\n",
    "from dask_ml.datasets import make_classification\n",
    "\n",
    "\n",
    "n, d = 100000, 100\n",
    "\n",
    "X, y = make_classification(n_samples=n, n_features=d,\n",
    "                           chunks=n // 10, flip_y=0.2)\n",
    "X"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For more information on creating dask arrays and dataframes from real data, see documentation on [Dask arrays](https://dask.pydata.org/en/latest/array-creation.html) and [Dask dataframes](https://dask.pydata.org/en/latest/dataframe-create.html)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Split data for training and testing\n",
    "\n",
    "We split our dataset into training and testing data to aid evaluation by making sure we have a fair test:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask_ml.model_selection import train_test_split\n",
    "X_train, X_test, y_train, y_test = train_test_split(X, y)\n",
    "X_train"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Persist data in memory\n",
    "\n",
    "This dataset is small enough to fit in distributed memory, so we call `dask.persist` to ask Dask to execute the computations above and keep the results in memory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you are working in a situation where your dataset does not fit in memory then you should skip this step.  Everything will still work, but will be slower and use less memory.\n",
    "\n",
    "Calling `dask.persist` will preserve our data in memory, so no computation will be needed as we pass over our data many times.  For example if our data came from CSV files and was not persisted, then the CSV files would have to be re-read on each pass.  This is desirable if the data does not fit in RAM, but not slows down our computation otherwise."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Precompute classes\n",
    "\n",
    "We pre-compute the classes from our training data, which is required for this classification example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "classes = da.unique(y_train).compute()\n",
    "classes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Scikit-Learn model\n",
    "\n",
    "We make the underlying Scikit-Learn estimator, an `SGDClassifier`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.linear_model import SGDClassifier\n",
    "\n",
    "est = SGDClassifier(loss='log', penalty='l2', tol=1e-3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we use `SGDClassifier`, but any estimator that implements the `partial_fit` method will work.  A list of Scikit-Learn models that implement this API is available [here](http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning).\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Wrap with Dask-ML's Incremental meta-estimator\n",
    "\n",
    "We now wrap our `SGDClassifer` with the [`dask_ml.wrappers.Incremental`](http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) meta-estimator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask_ml.wrappers import Incremental\n",
    "\n",
    "inc = Incremental(est, scoring='accuracy')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Recall that `Incremental` only does data management while leaving the actual algorithm to the underlying Scikit-Learn estimator.\n",
    "\n",
    "Note: We set the scoring parameter above in the Dask estimator to tell it to handle scoring.  This works better when using Dask arrays for test data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model training\n",
    "\n",
    "`Incremental` implements a `fit` method, which will perform one loop over the dataset, calling `partial_fit` over each chunk in the Dask array.\n",
    "\n",
    "You may want to watch the dashboard during this fit process to see the sequential fitting of many batches."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc.fit(X_train, y_train, classes=classes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc.score(X_test, y_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pass over the training data many times\n",
    "\n",
    "Calling `.fit` passes over all chunks our data once.  However, in many cases we may want to pass over the training data many times.  To do this we can use the `Incremental.partial_fit` method and a for loop."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "est = SGDClassifier(loss='log', penalty='l2', tol=0e-3)\n",
    "inc = Incremental(est, scoring='accuracy')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for _ in range(10):\n",
    "    inc.partial_fit(X_train, y_train, classes=classes)\n",
    "    print('Score:', inc.score(X_test, y_test))    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Predict and Score\n",
    "\n",
    "Finally we can also call `Incremental.predict` and `Incremental.score` on our testing data "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc.predict(X_test)  # Predict produces lazy dask arrays"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc.predict(X_test)[:100].compute()  # call compute to get results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inc.score(X_test, y_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Learn more\n",
    "\n",
    "In this notebook we went over using Dask-ML's `Incremental` meta-estimator to automate the process of incremental training with Scikit-Learn estimators that implement the `partial_fit` method.  If you want to learn more about this process you might want to investigate the following documentation:\n",
    "\n",
    "1.  http://scikit-learn.org/stable/modules/scaling_strategies.html\n",
    "2.  [Dask-ML Incremental API documentation](http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental)\n",
    "3.  [List of Scikit-Learn estimators compatible with Dask-ML's Incremental](http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning)\n",
    "4. For more info on the train-test split for model evaluation, see [Hyperparameters and Model Validation](https://jakevdp.github.io/PythonDataScienceHandbook/05.03-hyperparameters-and-model-validation.html)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
